package drds.plus.executor.cursor.cursor.impl.merge;


import com.google.common.base.Joiner;
import drds.plus.executor.ExecuteContext;
import drds.plus.executor.cursor.column_index_mapping.ColumnIndexInTemplateCursorMetaDataToColumnIndexInRequestCursorMetaDataConvertor;
import drds.plus.executor.cursor.cursor.IMergeCursor;
import drds.plus.executor.cursor.cursor.ISortingCursor;
import drds.plus.executor.cursor.cursor.impl.DuplicateValueRowDataLinkedList;
import drds.plus.executor.cursor.cursor.impl.SortingCursor;
import drds.plus.executor.cursor.cursor_metadata.CursorMetaData;
import drds.plus.executor.data_node_executor.DataNodeExecutorContext;
import drds.plus.executor.record_codec.RecordCodec;
import drds.plus.executor.record_codec.RecordCodecFactory;
import drds.plus.executor.record_codec.record.Record;
import drds.plus.executor.row_values.RowValues;
import drds.plus.executor.utils.Utils;
import drds.plus.sql_process.abstract_syntax_tree.ObjectCreateFactory;
import drds.plus.sql_process.abstract_syntax_tree.configuration.ColumnMetaData;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.ExecutePlan;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.MergeQuery;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.QueryWithIndex;
import drds.plus.sql_process.abstract_syntax_tree.expression.item.Item;
import drds.plus.sql_process.abstract_syntax_tree.expression.item.column.Column;
import drds.plus.sql_process.abstract_syntax_tree.expression.item.function.*;
import drds.plus.sql_process.abstract_syntax_tree.expression.order_by.OrderBy;
import drds.plus.sql_process.abstract_syntax_tree.node.query.TableQueryWithIndex;
import drds.plus.sql_process.optimizer.OptimizerContext;
import drds.plus.sql_process.utils.DnfFilters;
import drds.plus.sql_process.utils.Filters;
import drds.plus.sql_process.utils.OptimizerUtils;
import drds.plus.util.GeneralUtil;
import lombok.extern.slf4j.Slf4j;

import java.util.*;
import java.util.Map.Entry;

@Slf4j
public class MergeCursor extends SortingCursor implements IMergeCursor {

    protected final MergeQuery mergeQuery;
    protected final ExecuteContext executeContext;
    protected List<ISortingCursor> sortingCursorList;
    protected int sizeLimination = 10000;
    protected ColumnIndexInTemplateCursorMetaDataToColumnIndexInRequestCursorMetaDataConvertor columnIndexInTemplateCursorMetaDataToColumnIndexInRequestCursorMetaDataConvertor = new ColumnIndexInTemplateCursorMetaDataToColumnIndexInRequestCursorMetaDataConvertor();
    protected int currentIndex = 0;
    List<RuntimeException> exceptionsWhenCloseSubCursor = new ArrayList();
    private List<ColumnMetaData> returnColumnMetaDataList = null;

    public MergeCursor(ExecuteContext executeContext, MergeQuery mergeQuery, List<ISortingCursor> sortingCursorList) {
        super(null, null, null);

        this.mergeQuery = mergeQuery;
        this.executeContext = executeContext;
        this.sortingCursorList = sortingCursorList;
        List<OrderBy> orderByList = this.sortingCursorList.get(0).getOrderByList();
        setOrderByList(orderByList);
    }

    public MergeCursor(ExecuteContext executeContext, MergeQuery mergeQuery, List<ISortingCursor> sortingCursorList, List<OrderBy> orderByList, CursorMetaData cursorMetaData) {
        super(null, cursorMetaData, orderByList);
        this.sortingCursorList = sortingCursorList;

        this.mergeQuery = mergeQuery;
        this.executeContext = executeContext;
        setOrderByList(orderByList);
    }

    protected void init() {
        if (this.inited) {
            return;
        }
        currentIndex = 0;
        super.init();
    }

    public RowValues next() {
        init();
        // undecided 的话，分库并执行子节点，否则无法next
        if (!this.mergeQuery.isSharded() && this.sortingCursorList.isEmpty()) {
            this.shardAndExecuteSubQuery();
        }
        /**
         * <pre>
         * 因为subCursor和first Cursor的meta数据可能排列的顺序不一样。
         * 比如，cursor1 ,顺序可能是pk , Name.而cursor 2 ,顺序却是反过来的 ， Name , pk
         * 这时候在这里需要统一Cursor内的meta信息才可以。
         * </pre>
         */
        RowValues rowData = innerNext();
        return rowData;
    }

    private void shardAndExecuteSubQuery() {
        QueryWithIndex queryWithIndex = (QueryWithIndex) mergeQuery.getExecutePlan();
        TableQueryWithIndex tableQueryWithIndex = (TableQueryWithIndex) OptimizerUtils.convertExecutePlanToNode(queryWithIndex);
        tableQueryWithIndex.build();
        ExecutePlan executePlan = OptimizerContext.getOptimizerContext().getOptimizer().optimizeNodeAndToExecutePlanAndOptimizeExecutePlan(tableQueryWithIndex, executeContext.getParameters(), executeContext.getExtraCmds());
        ISortingCursor cursor = DataNodeExecutorContext.getExecutorContext().getDataNodeExecutor().execute(executeContext, executePlan);
        this.sortingCursorList.add(cursor);
    }

    private RowValues innerNext() {
        init();
        RowValues rowData;
        while (true) {
            if (currentIndex >= sortingCursorList.size()) {// 取尽所有cursor.
                return null;
            }
            ISortingCursor schematicCursor = sortingCursorList.get(currentIndex);
            rowData = schematicCursor.next();
            if (rowData != null) {
                rowData = columnIndexInTemplateCursorMetaDataToColumnIndexInRequestCursorMetaDataConvertor.wrapAndBuildColumnIndexInTemplateCursorMetaDataToColumnIndexInRequestCursorMetaDataMapIfNeed(rowData);
                return rowData;
            } else {
                sortingCursorList.get(currentIndex).close(exceptionsWhenCloseSubCursor);
                columnIndexInTemplateCursorMetaDataToColumnIndexInRequestCursorMetaDataConvertor.reset();// [因为每一个cursor的column_index_mapping都不一样，所以这里把这个数据弄成空的]
                currentIndex++;
            }
        }
    }

    public List<RuntimeException> close(List<RuntimeException> RuntimeExceptionList) {
        RuntimeExceptionList.addAll(exceptionsWhenCloseSubCursor);
        for (ISortingCursor cursor : sortingCursorList) {
            RuntimeExceptionList = cursor.close(RuntimeExceptionList);
        }
        return RuntimeExceptionList;
    }

    public boolean skipTo(Record keyRecord) throws RuntimeException {
        init();
        return super.skipTo(keyRecord);
    }

    public List<ISortingCursor> getSortingCursorList() {
        return sortingCursorList;
    }

    public List<DuplicateValueRowDataLinkedList> mgetDuplicateValueRowDataLinkedListList(List<Record> keyRecordList, boolean prefixMatch, boolean keyFilterOrValueFilter) throws RuntimeException {
        log.error("do mgetWith Duplicatelist in mergeCursor. should not be here");
        init();
        Map<Record, DuplicateValueRowDataLinkedList> map = parentCursorMgetRecordToDuplicateValueRowDataLinkedListMap(//
                keyRecordList, //
                prefixMatch, //
                keyFilterOrValueFilter);//
        return new ArrayList<DuplicateValueRowDataLinkedList>(map.values());
    }

    /**
     * 批量获取
     *
     * @param keyRecordList
     * @param prefixMatch
     * @param keyFilterOrValueFilter 为true使用keyFilter，为false使用valueFilter
     * @return
     * @throws RuntimeException
     */
    public Map<Record, DuplicateValueRowDataLinkedList> mgetRecordToDuplicateValueRowDataLinkedListMap(List<Record> keyRecordList, boolean prefixMatch, boolean keyFilterOrValueFilter) throws RuntimeException {
        init();
        if (prefixMatch) {
            throw new UnsupportedOperationException("not supported yet");
        } else {
            // 这里列的别名也丢了吧 似乎解决了
            QueryWithIndex queryWithIndex = (QueryWithIndex) mergeQuery.getExecutePlan();

            BooleanFilter booleanFilter = ObjectCreateFactory.createBooleanFilter();
            booleanFilter.setOperation(Operation.in);
            booleanFilter.setValueList(new ArrayList<Object>());
            String columnName = null;
            for (Record record : keyRecordList) {
                Map<String, Object> map = record.getColumnNameToValueMap();
                if (map.size() == 1) {
                    // 单字段in
                    Entry<String, Object> entry = map.entrySet().iterator().next();
                    Object value = entry.getValue();
                    columnName = entry.getKey();
                    Column column = ObjectCreateFactory.createColumn();
                    column.setColumnName(columnName);
                    column.setType(record.getValueType(0));

                    column.setTableName(queryWithIndex.getAlias());
                    booleanFilter.setColumn(column);
                    booleanFilter.getValueList().add(value);
                } else {
                    // 多字段in
                    if (booleanFilter.getColumn() == null) {
                        booleanFilter.setColumn(buildRowFunction(map.keySet(), true, record));
                    }
                    booleanFilter.getValueList().add(buildRowFunction(map.values(), false, record));

                }
            }

            TableQueryWithIndex tableQueryWithIndex = (TableQueryWithIndex) OptimizerUtils.convertExecutePlanToNode(queryWithIndex);
            if (keyFilterOrValueFilter) {
                tableQueryWithIndex.indexQueryKeyFilter(DnfFilters.and(removeFilter(tableQueryWithIndex.getIndexQueryKeyFilter(), booleanFilter), booleanFilter));//同列值新范围替换
            } else {
                tableQueryWithIndex.resultFilter(DnfFilters.and(removeFilter(tableQueryWithIndex.getResultFilter(), booleanFilter), booleanFilter));
            }
            tableQueryWithIndex.build();
            // 优化做法，将数据分配掉。
            Integer threadNo = mergeQuery.getThreadNo();
            executeContext.getExtraCmds().put("initThread", threadNo);
            ExecutePlan executePlan = OptimizerContext.getOptimizerContext().getOptimizer().optimizeNodeAndToExecutePlanAndOptimizeExecutePlan(tableQueryWithIndex, executeContext.getParameters(), executeContext.getExtraCmds());
            ISortingCursor sortingCursor = null;
            Map<Record, DuplicateValueRowDataLinkedList> recordToDuplicateKeyValuePairMap = null;
            try {
                sortingCursor = DataNodeExecutorContext.getExecutorContext().getDataNodeExecutor().execute(executeContext, executePlan);
                // 用于关闭，统一管理
                this.returnColumnMetaDataList = sortingCursor.getColumnMetaDataList();
                //keyRecords列信息
                List<Column> columnList = new ArrayList<Column>();
                if (booleanFilter.getColumn() instanceof Column) {
                    columnList.add((Column) booleanFilter.getColumn());
                } else {
                    columnList.addAll(((Function) booleanFilter.getColumn()).getArgList());
                }
                recordToDuplicateKeyValuePairMap = buildRecordToDuplicateValueRowDataLinkedListMap(sortingCursor, columnList);
            } finally {
                if (sortingCursor != null) {
                    List<RuntimeException> exs = new ArrayList();
                    exs = sortingCursor.close(exs);
                    if (!exs.isEmpty()) {
                        throw exs.get(0);
                    }
                }
            }
            return recordToDuplicateKeyValuePairMap;
        }
    }

    private Function buildRowFunction(Collection collection, boolean isColumn, Record record) {
        Function function = ObjectCreateFactory.createFunction();
        function.setFunctionName(FunctionName.row);
        //列信息
        StringBuilder sb = new StringBuilder();
        sb.append('(').append(Joiner.on(",").join(collection)).append(')');
        function.setColumnName(sb.toString());
        //
        if (isColumn) {
            List<Column> columnList = new ArrayList<Column>(collection.size());
            for (Object value : collection) {
                Column column = ObjectCreateFactory.createColumn();
                column.setColumnName((String) value);
                column.setType(record.getValueType((String) value));
                columnList.add(column);
            }
            function.setArgList(columnList);
        } else {
            function.setArgList(new ArrayList(collection));
        }
        return function;
    }

    /**
     * @param inFilter 去除这个条件
     *                 合并两个条件去除重复的key条件，比如构造了id in (xxx)的请求后，原先条件中有可能也存在id的条件，这时需要替换原先的id条件
     */
    private Filter removeFilter(Filter srcFilter, BooleanFilter inFilter) {
        List<List<Filter>> filterListList = DnfFilters.toDnfFilterListList(srcFilter);
        List<List<Filter>> newFilterListList = new ArrayList<List<Filter>>();
        for (List<Filter> filterList : filterListList) {
            List<Filter> newFilterList = new ArrayList<Filter>();
            for (Filter filter : filterList) {
                Object column = Filters.getFilterColumn(filter);
                if (!column.equals(inFilter.getColumn())) {
                    newFilterList.add(filter);
                }
            }
            newFilterListList.add(newFilterList);
        }
        return DnfFilters.orDnfFilterListList(newFilterListList);
    }

    /**
     * 根据返回结果，创建重复值的kvpairMap
     */
    private Map<Record, DuplicateValueRowDataLinkedList> buildRecordToDuplicateValueRowDataLinkedListMap(ISortingCursor sortingCursor, List<Column> columnList) throws RuntimeException {

        Map<Record, DuplicateValueRowDataLinkedList> recordToDuplicateKeyValuePairMap = new HashMap<Record, DuplicateValueRowDataLinkedList>();
        RowValues rowData;
        int count = 0;
        List<Column> newColumnList = new ArrayList<Column>();
        List<ColumnMetaData> columnMetaDataList = new ArrayList<ColumnMetaData>();
        for (Column column : columnList) {
            Item copy = column.copy();
            copy.setTableName(null);
            columnMetaDataList.add(Utils.getColumnMetaData(copy));
            newColumnList.add((Column) copy);
        }
        RecordCodec recordCodec = RecordCodecFactory.newRecordCodec(columnMetaDataList);
        while ((rowData = sortingCursor.next()) != null) {
            rowData = Utils.fromRowDataToArrayRowData(rowData);
            Record keyRecord = recordCodec.newRecord();
            for (Column column : newColumnList) {
                Object value = Utils.getValue(rowData, column);
                keyRecord.put(column.getColumnName(), value);
            }
            DuplicateValueRowDataLinkedList duplicateValueRowDataLinkedList = recordToDuplicateKeyValuePairMap.get(keyRecord);
            if (duplicateValueRowDataLinkedList == null) {//第一次
                duplicateValueRowDataLinkedList = new DuplicateValueRowDataLinkedList(rowData);
                recordToDuplicateKeyValuePairMap.put(keyRecord, duplicateValueRowDataLinkedList);
            } else {// 追加
                while (duplicateValueRowDataLinkedList.next != null) {
                    duplicateValueRowDataLinkedList = duplicateValueRowDataLinkedList.next;
                }
                duplicateValueRowDataLinkedList.next = new DuplicateValueRowDataLinkedList(rowData);
            }
            count++;
            if (count >= sizeLimination) {// 保护。。。别太多了
                throw new IllegalArgumentException("size is more than limination " + sizeLimination);
            }
        }

        return recordToDuplicateKeyValuePairMap;
    }

    public void beforeFirst() throws RuntimeException {
        inited = false;
        init();
        for (int i = 0; i < sortingCursorList.size(); i++) {
            sortingCursorList.get(i).beforeFirst();
        }
    }

    public String toString() {
        return toStringWithInden(0);
    }

    public String toStringWithInden(int inden) {
        String tabTittle = GeneralUtil.getTab(inden);
        String tabContent = GeneralUtil.getTab(inden + 1);
        StringBuilder sb = new StringBuilder();

        GeneralUtil.printlnToStringBuilder(sb, tabTittle + "MergeCursor ");
        GeneralUtil.printAFieldToStringBuilder(sb, "addOrderByItemAndSetNeedBuild", this.orderByList, tabContent);

        for (ISortingCursor cursor : sortingCursorList) {
            sb.append(cursor.toStringWithInden(inden + 1));
        }
        return sb.toString();

    }

    public List<ColumnMetaData> getColumnMetaDataList() throws RuntimeException {
        if (this.returnColumnMetaDataList != null) {
            return this.returnColumnMetaDataList;
        }
        if (this.sortingCursorList != null && !sortingCursorList.isEmpty()) {
            this.returnColumnMetaDataList = sortingCursorList.get(0).getColumnMetaDataList();
        }
        return this.returnColumnMetaDataList;
    }
}
